package easykafka

import (
	"context"
	"errors"
	"fmt"
	"gitee.com/Cookie_XiaoD/easykafka/spec"
	"github.com/Shopify/sarama"
	"strings"
	"sync"
)

//Consumer 消息消费者
type Consumer struct {
	//kafka的broker列表
	brokers []string
	//待消费的Topic列表
	topics []string
	//消费者组
	groupName string
	//消费者是否手动提交确认
	manualCommit bool
	//消费者auto.offset.reset模式
	aor spec.AORMode
	//当消费者接收到消息时的处理函数
	msgReceived OnMsgReceived

	//标记当前消费者是否开始消费
	isStarted     bool
	isStartedLock sync.RWMutex

	//当前消费者组会话
	session     sarama.ConsumerGroupSession
	sessionLock sync.RWMutex

	//实际进行消息消费组对象
	group sarama.ConsumerGroup
}

//NewConsumer 创建一个消费者
func NewConsumer(brokers string, topics []string, groupName string, dataMsgReceived OnMsgReceived, options ...ConsumerOption) (*Consumer, error) {
	if strings.TrimSpace(brokers) == "" {
		return nil, errors.New("brokers为空")
	}
	addr := strings.Split(brokers, ",")
	if len(addr) == 0 {
		return nil, errors.New("brokers格式不正确，应该如 127.0.0.1:9092,127.0.0.1:9093")
	}
	for _, s := range addr {
		if strings.TrimSpace(s) == "" {
			return nil, errors.New("存在空broker地址")
		}
	}
	if len(topics) == 0 {
		return nil, errors.New("topics为空")
	}
	for _, topic := range topics {
		if strings.TrimSpace(topic) == "" {
			return nil, errors.New("存在空topic")
		}
	}

	if strings.TrimSpace(groupName) == "" {
		return nil, errors.New("未提供有效的groupName")
	}
	if dataMsgReceived == nil {
		return nil, errors.New("未提供有效的dataMsgReceived")
	}

	cfg := &consumerConfig{
		manualCommit: false,       //默认自动确认消息，此时语义是至多一次
		aor:          spec.Newest, //默认消费最新消息
	}
	for _, v := range options {
		v(cfg)
	}

	config := sarama.NewConfig()

	//是否自动提交已消费的偏移量
	config.Consumer.Offsets.AutoCommit.Enable = !cfg.manualCommit
	//若当前组无消费偏移量记录，则从什么偏移量消费数据
	initial := sarama.OffsetNewest
	if cfg.aor == spec.Earliest {
		initial = sarama.OffsetOldest
	}
	config.Consumer.Offsets.Initial = initial

	if cfg.saslInfo != nil {
		config.Net.SASL.Enable = true
		config.Net.SASL.User = cfg.saslInfo.UserName
		config.Net.SASL.Password = cfg.saslInfo.Password
		config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
	}

	returnErrors := cfg.errorHandler != nil
	config.Consumer.Return.Errors = returnErrors

	group, err := sarama.NewConsumerGroup(addr, groupName, config)
	if err != nil {
		return nil, err
	}

	ret := &Consumer{
		brokers:      addr,
		groupName:    groupName,
		topics:       topics,
		aor:          cfg.aor,
		manualCommit: cfg.manualCommit,
		group:        group,
		msgReceived:  dataMsgReceived,
	}

	if returnErrors {
		go func() {
			for e := range group.Errors() {
				//消费过程中若通信中断，此处将有error，若通信恢复则消费自动恢复
				cfg.errorHandler(&ConsumeError{
					Err: e,
				})
			}
		}()
	}
	return ret, nil
}

func (c *Consumer) Confirm(msg spec.Msg) error {
	if err := c.checkManualCommit(); err != nil {
		return err
	}
	if err := c.checkStarted(); err != nil {
		return err
	}
	c.session.MarkOffset(msg.Topic(), msg.Partition(), msg.Offset()+1, "")
	c.session.Commit()
	return nil
}

func (c *Consumer) ConfirmBatch(msgs []spec.Msg) error {
	if len(msgs) == 0 {
		return errors.New("无任何待确认数据")
	}
	if err := c.checkManualCommit(); err != nil {
		return err
	}
	if err := c.checkStarted(); err != nil {
		return err
	}
	if err := c.checkSession(); err != nil {
		return err
	}
	//对于Kafka而言，相同topic下的相同partition下的多个不同的offset，以最后一个确认的为准
	//比如offset的顺序是 1 2 3 4 5 6 7 8 4 ，那么提交的offset不是8而是4
	//所以需要按照topic和partition进行分组，以组中最大的offset+1作为提交依据
	//因为kafka是按照提交的offset作为下次的消费的依据，提交10则下次就从10开始消费，所以+1以便从11开始消费
	groups := make(map[string]spec.Msg)
	for _, v := range msgs {
		if v == nil {
			continue
		}
		key := fmt.Sprintf("%v-%v", v.Topic(), v.Partition())
		m, ok := groups[key]
		if !ok {
			groups[key] = v
			continue
		}
		if v.Offset() > m.Offset() {
			groups[key] = v
		}
	}
	for _, v := range groups {
		c.session.MarkOffset(v.Topic(), v.Partition(), v.Offset()+1, "")
	}
	c.session.Commit()
	return nil
}

func (c *Consumer) StartBlock(ctx context.Context) {
	for {
		handler := defaultConsumerGroupHandler{c: c}
		errChan := make(chan error, 1)
		go func() {
			//与broker断开时这里会返回错误 需通过死循环进行重试
			err := c.group.Consume(context.Background(), c.topics, handler)
			errChan <- err
		}()
		select {
		case <-ctx.Done():
			break
		case <-errChan:
			break
		}
	}
}

func (c *Consumer) Close() (err error) {
	if c.group != nil {
		_ = c.group.Close()
	}
	return nil
}

func (c *Consumer) setStarted(started bool) {
	c.isStartedLock.Lock()
	c.isStarted = started
	c.isStartedLock.Unlock()
}

func (c *Consumer) checkStarted() error {
	c.isStartedLock.RLock()
	defer c.isStartedLock.RUnlock()
	if !c.isStarted {
		return errors.New("未开始消费数据,请先执行StartBlock")
	}
	return nil
}

func (c *Consumer) setSession(sess sarama.ConsumerGroupSession) {
	c.sessionLock.Lock()
	c.session = sess
	c.sessionLock.Unlock()
}

func (c *Consumer) checkSession() error {
	c.sessionLock.RLock()
	defer c.sessionLock.RUnlock()
	if c.session == nil {
		return errors.New("无有效Session")
	}
	return nil
}

func (c *Consumer) checkManualCommit() error {
	if !c.manualCommit {
		return errors.New("自动提交模式不能手动确认")
	}
	return nil
}

//consumerConfig 消费者配置
type consumerConfig struct {
	saslInfo     *spec.SASLInfo
	aor          spec.AORMode
	manualCommit bool
	errorHandler OnErrorOccurred
}

//ConsumerOption 消费者配置选项
type ConsumerOption func(*consumerConfig)

//WithConsumerSASL 指定消费者的SASL验证信息
func WithConsumerSASL(sasl spec.SASLInfo) ConsumerOption {
	return func(cfg *consumerConfig) {
		cfg.saslInfo = &sasl
	}
}

//WithConsumerAOR 指定消费者的自动偏移量重置方式 默认为Newest
func WithConsumerAOR(aorType spec.AORMode) ConsumerOption {
	return func(cfg *consumerConfig) {
		cfg.aor = aorType
	}
}

//WithConsumerManualCommit 指定消费者是否手动确认消息 默认为自动确认
func WithConsumerManualCommit(manual bool) ConsumerOption {
	return func(cfg *consumerConfig) {
		cfg.manualCommit = manual
	}
}

//WithConsumerErrorHandler 指定消费者错误处理器
func WithConsumerErrorHandler(h OnErrorOccurred) ConsumerOption {
	return func(cfg *consumerConfig) {
		cfg.errorHandler = h
	}
}

//ConsumeError 消费错误
type ConsumeError struct {
	Err error
}

//OnErrorOccurred 表示发生一个消费错误
type OnErrorOccurred func(err *ConsumeError)

//OnMsgReceived 表示收到一个消息
type OnMsgReceived func(msg spec.Msg)

//进行实际的消费处理
type defaultConsumerGroupHandler struct {
	c *Consumer
}

//Setup在新的会话开始前执行 早于ConsumeClaim
func (defaultConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}

//Cleanup在会话结束时执行
func (defaultConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}
func (h defaultConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	//当与broker断开连接，重连并且消费到新消息时会重新进入ConsumeClaim
	//断开过程中收到的消息无法进行确认（会失败，但API不会返回错误） 只能待恢复后用新会话进行提交
	h.c.setSession(sess)
	h.c.setStarted(true)
	for msg := range claim.Messages() {
		//将收到的消息保证后处理
		dm := &MsgData{
			session: sess,
			msg:     msg,
		}
		if !h.c.manualCommit {
			//自动提交模式下每收到一条消息就进行标记，保证自动Commit时可以确认收到的消息
			sess.MarkMessage(msg, "")
		}
		h.c.msgReceived(dm)
	}
	h.c.setStarted(false)
	return nil
}
